package com.wang.dmp.etl

import ch.hsr.geohash.GeoHash
import com.wang.dmp.utils.{BaiDuLBSHandler, ConfigHandler, MySQLHandler}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

import scala.collection.mutable.ListBuffer

/**
 * 将日志文件的经度纬度提取出来，装成商圈信息
 */
object LagLng2Business {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    sparkConf.setMaster("local[*]")
    sparkConf.setAppName("统计日志文件中各省份数据分布情况")
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //序列化

    //创建spark  sql
    val sparkContext = new SparkContext(sparkConf)
    val sQLContext = new SQLContext(sparkContext)

    //读取数据 配置文件中的路径
    val rawDataFrame = sQLContext.read.parquet(ConfigHandler.parquetPath)

    //从读取的原生数据中抽取数据
    rawDataFrame.select("long", "lat").filter("cast(long as double) >= 73 and cast(long as double) <= 136 and cast(lat as double) >= 73 and cast(lat as double)")
      .distinct() //去重
      .foreachPartition(iter => {

        val list = new ListBuffer[(String, String)]() //为了批量存入mysql
        iter.foreach(row => {
          val lat = row.getAs[String]("lat")
          val lng = row.getAs[String]("long")
          //识别哈哈哈哈哈
          val business = BaiDuLBSHandler.paresBusinessTagBy(lng, lat)
          val geoHashCode = GeoHash.geoHashStringWithCharacterPrecision(lat.toDouble, lng.toDouble, 8)

          if (StringUtils.isNotEmpty(business.toString)) {
            list.append((geoHashCode, business.toString))
          }


        })
        //写入到Mysql
        MySQLHandler.saveBusinessTag(list)

      })


    //关闭
    sparkContext.stop()
  }
}
